Skip to content

feat(connectors): add S3 sink connector#3103

Open
atharvalade wants to merge 5 commits into
apache:masterfrom
atharvalade:feat/s3-sink-connector
Open

feat(connectors): add S3 sink connector#3103
atharvalade wants to merge 5 commits into
apache:masterfrom
atharvalade:feat/s3-sink-connector

Conversation

@atharvalade
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2956

Rationale

Iggy lacks a native way to write stream messages to Amazon S3 and S3-compatible stores (MinIO, Cloudflare R2, Backblaze B2, DigitalOcean Spaces). This is a frequently requested capability for data lake ingestion and long-term archival pipelines.

What changed?

There was no connector for persisting Iggy messages to object storage. Users had to build custom consumers and upload logic to get data into S3.

This PR adds a new iggy_connector_s3_sink crate that implements the Sink trait. It buffers messages in-memory per stream/topic/partition, rotates files by size or message count, renders S3 keys from a configurable path template ({stream}/{topic}/{date}/{hour}/...), and uploads with retry + exponential backoff. Supports json_lines, json_array, and raw output formats with optional Iggy metadata and header embedding. Uses rust-s3 (already in workspace) with path-style addressing auto-enabled for custom endpoints.

Key implementation details:

  • 6 source modules: lib.rs (config + entry point), client.rs (S3 client init + bucket verification), buffer.rs (in-memory accumulation + rotation logic), formatter.rs (JSON/raw output + metadata/header inclusion), path.rs (template engine for S3 keys with offset-based filenames), sink.rs (Sink trait: open/consume/close lifecycle)
  • 36 unit tests covering config deserialization, buffer rotation, path template rendering, all output formats, credential validation, and edge cases
  • CI integration: added to _build_rust_artifacts.yml and edge-release.yml for cdylib plugin builds and release notes
  • Error handling: warnings logged on invalid config fallbacks, explicit buffer management on upload failure, close() warns if S3 client was never initialized
  • End-to-end tested locally with MinIO in Docker, Iggy server, CLI producer, and connector runtime — verified messages flow from Iggy stream into S3 bucket as properly formatted JSON

Local Execution

  • Passed
  • Pre-commit hooks ran
  • Full CI checklist passed locally:
    • cargo fmt --check -- pass
    • cargo clippy --tests -D warnings -- pass (zero warnings)
    • cargo test -p iggy_connector_s3_sink -- 36/36 pass
    • markdownlint --check -- pass
    • trailing-whitespace -- pass
    • trailing-newline -- pass
    • license-headers -- pass

AI Usage

  1. Opus 4.6
  2. used for scaffolding boilerplate and initial file structure, all logic was reviewed and iterated manually
  3. Verified through full local compilation, 36 unit tests, clippy with -D warnings, and end-to-end testing with MinIO Docker + Iggy server + CLI producer + connector runtime
  4. Yes

Here are all the relevant screenshots:

  • MinIO Docker container running and accessible at localhost:9000
  • MinIO web console showing the created iggy-test bucket
  • Iggy server started with root credentials configured
  • Iggy CLI creating stream application_logs and topic api_requests
  • Iggy CLI sending test messages to the topic
  • Connector runtime loading the S3 sink plugin and connecting to MinIO
  • Connector runtime consuming messages and uploading to S3
  • MinIO console showing the uploaded .jsonl file in the correct path structure (application_logs/api_requests/{date}/{hour}/)
  • Contents of the uploaded file showing properly formatted JSON lines with metadata (offset, timestamp, stream, topic, partition_id, payload)
  • All 36 unit tests passing
  • cargo clippy --tests -D warnings passing with zero warnings
Screenshot 2026-04-13 at 1 37 24 AM Screenshot 2026-04-13 at 1 36 38 AM Screenshot 2026-04-13 at 1 36 30 AM Screenshot 2026-04-13 at 1 36 12 AM Screenshot 2026-04-13 at 1 35 34 AM Screenshot 2026-04-13 at 1 28 47 AM Screenshot 2026-04-13 at 1 28 25 AM Screenshot 2026-04-13 at 1 28 16 AM Screenshot 2026-04-13 at 1 28 06 AM Screenshot 2026-04-13 at 1 27 52 AM

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 13, 2026

Codecov Report

❌ Patch coverage is 76.84070% with 173 lines in your changes missing coverage. Please review.
✅ Project coverage is 19.27%. Comparing base (eb20ac5) to head (980d64d).
⚠️ Report is 58 commits behind head on master.

Files with missing lines Patch % Lines
core/connectors/sinks/s3_sink/src/sink.rs 0.00% 73 Missing ⚠️
core/connectors/sinks/s3_sink/src/lib.rs 78.19% 41 Missing ⚠️
core/connectors/sinks/s3_sink/src/client.rs 62.88% 36 Missing ⚠️
core/connectors/sinks/s3_sink/src/formatter.rs 89.94% 18 Missing and 2 partials ⚠️
core/connectors/sinks/s3_sink/src/buffer.rs 97.00% 3 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master    #3103       +/-   ##
=============================================
- Coverage     74.08%   19.27%   -54.82%     
  Complexity      943      943               
=============================================
  Files          1159     1163        +4     
  Lines        102033    91064    -10969     
  Branches      79084    68132    -10952     
=============================================
- Hits          75593    17550    -58043     
- Misses        23770    73103    +49333     
+ Partials       2670      411     -2259     
Components Coverage Δ
Rust Core 1.23% <76.84%> (-74.08%) ⬇️
Java SDK 60.14% <ø> (ø)
C# SDK 69.07% <ø> (-0.31%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.53% <ø> (ø)
Go SDK 39.43% <ø> (ø)
Files with missing lines Coverage Δ
core/connectors/sinks/s3_sink/src/path.rs 100.00% <100.00%> (ø)
core/connectors/sinks/s3_sink/src/buffer.rs 97.00% <97.00%> (ø)
core/connectors/sinks/s3_sink/src/formatter.rs 89.94% <89.94%> (ø)
core/connectors/sinks/s3_sink/src/client.rs 62.88% <62.88%> (ø)
core/connectors/sinks/s3_sink/src/lib.rs 78.19% <78.19%> (ø)
core/connectors/sinks/s3_sink/src/sink.rs 0.00% <0.00%> (ø)

... and 657 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs.

If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR.

Thank you for your contribution!

@github-actions github-actions Bot added the S-stale Inactive issue or pull request label Apr 21, 2026
Copy link
Copy Markdown
Contributor

@slbotbm slbotbm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments.
Also, do you plan to support using parquet files as well in the future?

Comment on lines +169 to +194
"S3 sink ID: {id} invalid output_format '{}': {e}, defaulting to json_lines",
config.output_format,
);
OutputFormat::JsonLines
}
};

let max_file_size_bytes = match parse_file_size(&config.max_file_size) {
Ok(size) => size,
Err(e) => {
tracing::warn!(
"S3 sink ID: {id} invalid max_file_size '{}': {e}, defaulting to 8 MiB",
config.max_file_size,
);
8 * 1024 * 1024
}
};

let delay_str = config.retry_delay.as_deref().unwrap_or(DEFAULT_RETRY_DELAY);
let retry_delay = match humantime::Duration::from_str(delay_str) {
Ok(d) => d.into(),
Err(e) => {
tracing::warn!(
"S3 sink ID: {id} invalid retry_delay '{delay_str}': {e}, defaulting to 1s",
);
std::time::Duration::from_secs(1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, if an error occurs, the connector picks the default option, which may not be something that all users want. Would it be better to gate using the default options behind some kind of flag?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll change these to return hard errors on invalid config

Comment on lines +86 to +87

let max_messages = self.config.max_messages_per_file.unwrap_or(u64::MAX);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone chooses number of messages for file rotation and forgets to define max_messages_per_file, max messages will be set to u64::MAX, which means that processes other than the connector will force the flush to s3, which I believe is not an ideal situation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a validation in open() that returns an error if file_rotation = "messages" but max_messages_per_file is not set. do you think this is a good approach?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a good approach. In addition, how about erroring hard in let max_messages = self.config.max_messages_per_file.unwrap_or(u64::MAX); instead of passing u64::MAX as a fallback?

Comment on lines +26 to +38
pub fn validate_credentials(config: &S3SinkConfig) -> Result<(), Error> {
match (&config.access_key_id, &config.secret_access_key) {
(Some(_), Some(_)) | (None, None) => Ok(()),
_ => Err(Error::InvalidConfigValue(
"Partially configured credentials. You must provide both access_key_id \
and secret_access_key, or omit both."
.to_owned(),
)),
}
}

pub async fn create_bucket(config: &S3SinkConfig) -> Result<Box<Bucket>, Error> {
validate_credentials(config)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to inline this function? As far as I can see, it is being used in only one place. You could add a comment instead saying that this piece of code validates the creds.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah fair enough

Comment on lines +117 to +126
pub fn from_str_config(s: &str) -> Result<Self, Error> {
match s.to_lowercase().as_str() {
"json_lines" | "jsonl" | "jsonlines" => Ok(OutputFormat::JsonLines),
"json_array" | "json" => Ok(OutputFormat::JsonArray),
"raw" => Ok(OutputFormat::Raw),
other => Err(Error::InvalidConfigValue(format!(
"Unknown output format: '{other}'. Expected: json_lines, json_array, or raw"
))),
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't using From for OutputFormat make more sense here? (though the current version also works just fine)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll implement TryFrom<&str> for OutputFormat which is more idiomatic

@slbotbm
Copy link
Copy Markdown
Contributor

slbotbm commented Apr 21, 2026

I also feel data loss due to maximum retries being exceeded should be mentioned in readme.md as a precaution.

@github-actions github-actions Bot removed the S-stale Inactive issue or pull request label Apr 22, 2026
@atharvalade atharvalade force-pushed the feat/s3-sink-connector branch from 96ed8d1 to 87e0cc0 Compare April 24, 2026 15:16
@atharvalade
Copy link
Copy Markdown
Contributor Author

I left some comments. Also, do you plan to support using parquet files as well in the future?

oh yes absolutely.. parquet support is on the roadmap as a future output_format option

@atharvalade
Copy link
Copy Markdown
Contributor Author

I also feel data loss due to maximum retries being exceeded should be mentioned in readme.md as a precaution.

I agree, I'll add that

@atharvalade atharvalade force-pushed the feat/s3-sink-connector branch from 0a37619 to 00fc2df Compare April 28, 2026 17:34
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 6, 2026

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs.

If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR.

Thank you for your contribution!

@github-actions github-actions Bot added S-stale Inactive issue or pull request and removed S-stale Inactive issue or pull request labels May 6, 2026
@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs.

If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR.

Thank you for your contribution!

@github-actions github-actions Bot added the S-stale Inactive issue or pull request label May 14, 2026
@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 14, 2026

/ready

@github-actions github-actions Bot added S-waiting-on-review PR is waiting on a reviewer and removed S-stale Inactive issue or pull request labels May 14, 2026
- Buffered uploads with configurable file rotation (by size or message count)
- Multiple output formats: JSON Lines, JSON Array, Raw
- Configurable path templates with variables for stream, topic, date, hour, partition
- Deterministic S3 keys based on offset ranges for idempotent crash recovery
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"deterministic S3 keys ... idempotent crash recovery" is false on three grounds:

either remove the claim, or remove {timestamp} from the template and document the in-memory loss path honestly.

- Deterministic S3 keys based on offset ranges for idempotent crash recovery
- Optional metadata and header inclusion in output
- Support for custom endpoints (MinIO, R2) and path-style addressing
- Retry with exponential backoff on upload failures
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"retry with exponential backoff" - code at sink.rs:260 is retry_delay * attempts, which is linear (1s, 2s, 3s). either update the doc to "linear backoff" or implement retry_delay * 2u32.pow(attempts - 1) with jitter. also AFAIR there is backoff in connectors SDK, please check it.


## Data Delivery Guarantees

This connector provides **at-least-once** delivery under normal operation. However, **data loss can occur** if all upload retries are exhausted (controlled by `max_retries`). When an upload fails after all retry attempts, the affected messages are dropped and an error is logged. Monitor your connector logs for `failed to upload` errors in production. Increase `max_retries` and `retry_delay` if transient S3 failures are common in your environment.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this paragraph claims at-least-once and then admits "data loss can occur" in the same sentence - that is self-contradictory. given #2927 + #2928, no sink connector can deliver at-least-once today. the canonical in-tree wording is http_sink/README.md:790-800 which honestly documents at-most-once and cites both bugs. recommend copying that section verbatim.

chrono = { workspace = true }
dashmap = { workspace = true }
humantime = { workspace = true }
iggy_common = { workspace = true }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iggy_common is declared under [dependencies] but the only usage in this crate is formatter.rs:233-235 under #[cfg(test)]. move it to [dev-dependencies] so it does not bloat the cdylib build.

# under the License.

[package]
name = "iggy_connector_s3_sink"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing publish = false. every other connector sink (postgres_sink, delta_sink, http_sink, elasticsearch_sink, stdout_sink) declares publish = false because they are cdylib plugins not meant for crates.io. without it the crate would publish on the next workspace release.

}

let mut state = self.state.lock().await;
state.messages_processed += messages.len() as u64;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state.messages_processed += messages.len() as u64 runs unconditionally outside the rotate loop, so when a mid-batch flush dropped N messages this counter still claims it processed them. upload_errors increments separately at :209. result: the close-log at :155-156 reports more messages processed than actually landed in S3. either decrement on drop or split into messages_buffered / messages_uploaded / messages_lost.

// Reset buffer even on failure to prevent unbounded growth.
// Messages are lost but offsets will be re-delivered by the
// runtime on next poll since consume() returned Ok.
buffer.reset();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the worst data-loss path in the PR. on retry-exhaust the failure branch logs an error and then calls buffer.reset() at :214 to drop the messages, while consume at :126 still returns Ok(()). the comment at :211-213 claims "offsets will be re-delivered by the runtime on next poll since consume() returned Ok" - that is doubly false:

net result: a single transient S3 hiccup that exhausts max_retries (default 3) permanently loses every buffered message. the README at line 139 acknowledges this but still claims at-least-once on the same line.

minimum: drop the false comment, propagate Err to the runtime, and align the README with http_sink/README.md:790-800 (at-most-once + cite #2927 / #2928).

return Ok(());
}
attempts += 1;
if attempts >= max_retries {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempts starts at 0 and increments before the >= max_retries check, so max_retries = 3 yields 3 total attempts (2 retries past the initial one). this matches the postgres_sink pattern but the field name is misleading. either rename to max_attempts or use attempts > max_retries so the field name lines up with semantics.

let mut attempts = 0u32;

loop {
match bucket.put_object(s3_key, data).await {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the retry loop treats every non-2xx status uniformly. AWS permanent failures - AccessDenied (403), NoSuchBucket (404), InvalidBucketName (400), MalformedPolicy - get retried 3 times, wasting retry_delay * (1 + 2) = 3s before the final Err. classify retriable vs not: 5xx + 408 + 429 + 503 SlowDown -> retry; other 4xx -> fail fast.

);
}
}
tokio::time::sleep(retry_delay * attempts).await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry_delay * attempts is linear backoff (1s, 2s, 3s), but the README at line 13 advertises exponential. either implement retry_delay * 2u32.pow(attempts - 1) with jitter, or update the doc to "linear".

separately, Duration::Mul<u32> panics on overflow - default config is safe but pathological values (e.g. retry_delay = "1h" + large max_retries) would panic. saturating_mul is cheap defensive practice.

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 20, 2026

/author

@github-actions github-actions Bot added S-waiting-on-author PR is waiting on author response and removed S-waiting-on-review PR is waiting on a reviewer labels May 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-author PR is waiting on author response

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Amazon S3 Sink Connector

3 participants